package UDF

import Source.SensorReading
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.TableAggregateFunction
import org.apache.flink.types.Row
import org.apache.flink.util.Collector


/**
 * 表聚合
 */
object TableAggFunctionTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    val tableEnv = StreamTableEnvironment.create(env, settings)


    val inputPath = "src/main/resources/SensorReading"
    val inputStream = env.readTextFile(inputPath)

    //转换成样例类类型
    val dataStream = inputStream.map(
      data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
      //选自字段作为时间戳
    ).assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(t: SensorReading) = t.timeStamp
      })
    val sensorTable = tableEnv.fromDataStream(dataStream
      , 'id, 'temperature, 'timeStamp.rowtime as 'ts)

    //Table API
    val top2Temp = new Top2Temp()
    val resultTable = sensorTable
      .groupBy('id)
      .flatAggregate(top2Temp('temperature) as('temp, 'rank))
      .select('id, 'temp, 'rank)

    resultTable.toRetractStream[Row].print("table")
    env.execute()
  }
}

//定义一个类来表示聚合状态
class Top2TempAcc {
  var highestTemp: Double = Double.MinValue
  var secondTemp: Double = Double.MinValue
}

class Top2Temp extends TableAggregateFunction[(Double, Int), Top2TempAcc] {
  override def createAccumulator(): Top2TempAcc = new Top2TempAcc()

  //实现计算聚合结果的函数
  def accumulate(acc: Top2TempAcc, temp: Double) = {
    //判断当前温度值的大小
    if (temp > acc.highestTemp) {
      acc.secondTemp = acc.highestTemp
      acc.highestTemp = temp
    } else if (temp > acc.secondTemp) {
      acc.secondTemp = temp
    }
  }

  //实现一个输出结果的方法,全部数据处理好后，才会输出
  def emitValue(acc: Top2TempAcc, out: Collector[(Double, Int)]): Unit = {
    out.collect(acc.highestTemp, 1)
    out.collect(acc.secondTemp, 2)
  }
}